package com.xiguthings.xiniu.iot.trigger.worker.controller;

import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.xiguthings.xiniu.iot.common.DeviceData;
import com.xiguthings.xiniu.iot.common.DeviceDataBody;
import com.xiguthings.xiniu.iot.common.DeviceDataHead;
import com.xiguthings.xiniu.iot.common.entity.TriggerRule;
import com.xiguthings.xiniu.iot.etce.TemplateEtcdClient;
import com.xiguthings.xiniu.iot.etce.TriggerEtcdClient;
import com.xiguthings.xiniu.iot.trigger.worker.kafka.KafkaClient;
import com.xiguthings.xiniu.iot.trigger.worker.trigger.Trigger;

@Component
public class DataContrller {
	private final Logger LOGGER = LoggerFactory.getLogger(DataContrller.class);

	public DataContrller() {
		super();
		// ruleEtcdClient = SpringUtils.getBean(TriggerEtcdClient.class);
		// templateEtcdClient = SpringUtils.getBean(TemplateEtcdClient.class);
	}

	@Autowired
	private TriggerEtcdClient ruleEtcdClient;
	@Autowired
	private TemplateEtcdClient templateEtcdClient;

	@Autowired
	private Trigger trigger;

	@Autowired
	private KafkaClient kafkaClient;

	/**
	 * 接收从kafka过来的消息
	 *
	 * @param data
	 */
	public void receiveData(String data) {
		if (StringUtils.isBlank(data)) {
			return;
		}

		DeviceData deviceData = formatMessage(data);
		LOGGER.debug("收到的数据：\n{}", deviceData);
		deviceData.dataStr = data;

		// 先根据deviceId查找规则
		String deviceId = deviceData.getHead().getDeviceId().toString();
		List<TriggerRule> trtiggerRuleByDeviceId = ruleEtcdClient.getTrtiggerRuleByDeviceData(deviceData);
		if (trtiggerRuleByDeviceId != null) {
			trigger.checkDeviceData(deviceData, trtiggerRuleByDeviceId);
			return;
		}

		// 根据modelId查找规则
		// String modelId = deviceData.getHead().getModelId().toString();
		// List<TriggerRule> triggerRuleByModelId =
		// ruleEtcdClient.getTriggerRuleByModelId(modelId);
		// if (triggerRuleByModelId != null) {
		// trigger.checkDeviceData(deviceData, triggerRuleByModelId);
		// return;
		// }

		// 因为规则关系改变，所有废弃这段代码
		// 根据projectId查找规则
		// String projectId = deviceData.getHead().getProjectId();
		// List<TriggerRule> triggerRuleByPojectId =
		// ruleEtcdClient.getTrtiggerRuleByPojectId(projectId);
		// if (triggerRuleByPojectId != null) {
		// trigger.checkDeviceData(deviceData, triggerRuleByPojectId);
		// return;
		// }
		LOGGER.info("该条数据没有对应的触发规则：{}", data);
		return;
	}

	/**
	 * 生产消息
	 */
	public void producerMessage(String msg, String topic, Integer partition) {
		LOGGER.info("触发了条件，发送消息{}", msg);
		kafkaClient.produceData(msg, topic, partition);
	}

	/**
	 * 将收到的字符串，转成java对象
	 *
	 * @param dataStr
	 * @return
	 */
	private DeviceData formatMessage(String dataStr) {
		try {
			String[] partArr = dataStr.split("\\|\\|");
			DeviceData deviceData = new DeviceData();
			deviceData.setHead(JSON.parseObject(partArr[0], DeviceDataHead.class));
			deviceData.setBody(JSON.parseObject(partArr[1], DeviceDataBody.class));
			return deviceData;
		} catch (Exception e) {
			LOGGER.error("接收到kafka的数据格式不正确。收到的是消息:{}", dataStr);
		}
		return null;
	}
}
